package com.galeno.load

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.DriverManager
import scala.collection.mutable.ListBuffer

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/8/2420:03
 */
object 从数据库读取数据 extends App {


  private val conf = new SparkConf
  conf.setAppName(this.getClass.getName)
  conf.setMaster("local[*]")
  private val context = new SparkContext(conf)
  private val threeCountryHuman: RDD[String] = context.textFile("data/battel.txt")
  private val rdd2: RDD[(String, String, String, String)] = threeCountryHuman.map(table => {
    val oneLine = table.split(",")
    (oneLine(0), oneLine(1), oneLine(2), oneLine(3))
  })


  rdd2.mapPartitions(itor => {
    var connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "root")
    var statement = connection.prepareStatement("select age,phone from battel where id =?")
    var resultit = itor.flatMap(tp => {
      val id = tp._1
      statement.setString(1, id)
      val resultSet = statement.executeQuery()
      val listBuffer = new ListBuffer[(String, String, String, String, String, String)]
      while (resultSet.next()) {
        val age = resultSet.getInt("age")
        val phone = resultSet.getString("phone")
        listBuffer += ((tp._1, tp._2, tp._3, tp._4, age.toString, phone))
      }
      resultSet.close()
      listBuffer
    })
    if(resultit.next()==null){
      statement.close()
      connection.close()
    }
    resultit
  })
  rdd2.foreach(println)


}
